import argparse
import glob
import os
import sys
import time
import warnings
import json, codecs
import random
import matplotlib.pyplot as plt
from scipy.optimize import linear_sum_assignment as linear_assignment
import math
import cycler
from matplotlib import colors
from matplotlib import pyplot as plt
import matplotlib.patches as patches
import tensorflow as tf
# import humanfriendly
import numpy as np
import pandas as pd
from PIL import Image, ImageFile, ImageFont, ImageDraw
import statistics
from torch.utils.data import Dataset,DataLoader
import torch
# import tensorflow.compat.v1 as tf
# tf.disable_v2_behavior()
from tqdm import tqdm
%matplotlib inline
# from CameraTraps.ct_utils import truncate_float
print('TensorFlow version:', tf.__version__)
print('Is GPU available? tf.test.is_gpu_available:', tf.test.is_gpu_available())
def image_name_to_id(name):
return name.rstrip('.jpg')
def read_image(path):
with tf.io.gfile.GFile(path, 'rb') as f:
return np.array(Image.open(f))
def read_json(path):
with tf.io.gfile.GFile(path) as f:
return json.load(f)
def create_detection_map(annotations,mode="train"):
"""Creates a dict mapping IDs ---> detections."""
ann_map = {}
for image in annotations['images']:
if image['file'].split('/')[0] == mode:
ann_map[image['file'].split('/')[-1].rstrip('.jpg')] = image['detections']
return ann_map
# TRAIN
IMAGES_DIR_TRAIN = "/kaggle/input/iwildcam2022-fgvc9/train/train"
BOX_ANNOTATION_FILE = "/kaggle/input/iwildcam2022-fgvc9/metadata/metadata/iwildcam2022_mdv4_detections.json"
MASKS_DIR = "/kaggle/input/iwildcam2022-fgvc9/instance_masks/instance_masks"
images_train = tf.io.gfile.listdir(IMAGES_DIR_TRAIN)
# The annotations file contains annotations for all images in train and test
annotations = read_json(BOX_ANNOTATION_FILE)
detection_train_map = create_detection_map(annotations)
images_train_ids = list(detection_train_map.keys())
#TEST
IMAGES_DIR_TEST = "/kaggle/input/iwildcam2022-fgvc9/test/test"
images_test = tf.io.gfile.listdir(IMAGES_DIR_TEST)
detection_test_map = create_detection_map(annotations,mode="test")
images_test_ids = list(detection_test_map.keys())
print(f'length of detection map for train = {len(detection_train_map)}\nlength of detection map for test = {len(detection_test_map)}\n')
print(f'length of images for train = {len(images_train)}\nlength of images for test = {len(images_test)}\n')
print(f'total annotations from megaDetector model = {len(annotations["images"])}')
with codecs.open("../input/iwildcam2022-fgvc9/metadata/metadata/iwildcam2022_train_annotations.json", 'r',
encoding='utf-8', errors='ignore') as f:
train_meta = json.load(f)
with codecs.open("../input/iwildcam2022-fgvc9/metadata/metadata/iwildcam2022_test_information.json", 'r',
encoding='utf-8', errors='ignore') as f:
test_meta = json.load(f)
seq_test = pd.DataFrame(test_meta['images'])
#train_cat.columns = [ 'category_id', 'scientificName','family', 'genus']
display(seq_test)
seq_train = pd.DataFrame(train_meta['images'])
#train_cat.columns = [ 'category_id', 'scientificName','family', 'genus']
display(seq_train)
COLOR_CYCLER = cycler.cycler(color=['tab:blue', 'tab:green', 'tab:orange',
'tab:red', 'tab:purple'])
pd_example = seq_train[seq_train['seq_id'] == "30048d32-7d42-11eb-8fb5-0242ac1c0002"]
def get_image_annotation(image, detection_annotations, categories,instance_id_image,ax):
"""Plot boxes and mask annotations for a given image.
Args:
image: An image array of shape [H, W, 3]
detection_annotations: A list of detections. Each detection is a dict
containing the keys 'category', 'bbox' and 'conf'.
categories: A dict mapping category IDs to names.
instance_id_image: An array of shape [H, W] containing the instance ID
at each pixel. IDs are expected to be 1-indexed, with 0 reserved for
the background."""
cycle_iter = COLOR_CYCLER()
image_height, image_width = image.shape[:2]
ax.imshow(image)
for i, annotation in enumerate(detection_annotations):
xmin, ymin, width, height = annotation['bbox']
xmin *= image_width
ymin *= image_height
width *= image_width
height *= image_height
color = next(cycle_iter)['color']
rect = patches.Rectangle((xmin, ymin), width, height,
linewidth=3, edgecolor=color, facecolor='none')
ax.add_patch(rect)
label = '{}:{:.2f}'.format(categories[annotation['category']],
annotation['conf'])
ax.text(xmin, ymin - 5, label, fontsize=30, color='white',
bbox=dict(boxstyle='square,pad=0.0', facecolor=color, alpha=0.75,
ec='none'))
r, g, b, _ = colors.to_rgba(color)
color_array = np.array([r, g, b]).reshape(1, 1, 3)
color_image = np.ones((image_height, image_width, 3)) * color_array
mask = (instance_id_image == (i + 1)).astype(np.float32)[:, :, np.newaxis]
color_mask = np.concatenate([color_image, mask], axis=2)
ax.imshow(color_mask, alpha=0.5)
return color_mask
def show_images_seq(data,detections,train = True):
rows = data.shape[0]
cols = data.shape[1]
fig, axs = plt.subplots(rows, dpi=80, figsize=(rows*5, rows*5))
for i in range(rows):
image_name = data["file_name"][i]
image_path = os.path.join(IMAGES_DIR_TRAIN if train else IMAGES_DIR_TEST, image_name)
image_id = image_name_to_id(image_name)
mask_path = os.path.join(MASKS_DIR, f'{image_id}.png')
image = read_image(image_path)
if (image_id not in detections) or (len(detections[image_id]) == 0) or ( not tf.io.gfile.exists(mask_path)):
plt.title(f'{image_name} missing detection data.')
axs[i].imshow(image)
else:
detection_annotations = detections[image_id]
instance_id_image = read_image(mask_path)
image = get_image_annotation(image,detection_annotations,annotations['detection_categories'],instance_id_image,axs[i])
axs[i].axis("off")
plt.show()
show_images_seq(pd_example,detection_train_map)
def count_detections(row,detections):
image_id = row['file_name'].split('.')[0]
threshold = 0.95
count = 0
for bbox in detections[image_id]:
if bbox['conf'] > threshold:
count += 1
return count
def generate_zero_submission(seq_ids):
sub = pd.DataFrame(seq_ids, columns=['Id'])
sub['Predicted'] = 0
return sub
seq_test["detections_count"] = np.nan
for idx,row in tqdm(seq_test.iterrows()):
seq_test.at[idx, 'detections_count'] = count_detections(row,detection_test_map)
submission_res_by_max = generate_zero_submission(seq_test.seq_id.unique())
for seq_id in tqdm(seq_test.seq_id.unique()):
max_count = seq_test[seq_test.seq_id == seq_id]['detections_count'].max()
submission_res_by_max.loc[submission_res_by_max.Id == seq_id, 'Predicted'] = max_count
display(submission_res_by_max)
print(max(submission_res_by_max.Predicted))
submission_res_by_max.to_csv (r'res_by_max.csv', index = False, header=True)
Tracking in deep learning is the task of predicting the positions of objects throughout a video using their spatial as well as temporal features. More technically, Tracking is getting the initial set of detections, assigning unique ids, and tracking them throughout frames of the video (or sequence of frames) feed while maintaining the assigned ids. Tracking is generally a two-step process:
1. A detection module for target localization: The module responsible for detecting and localization of the object in the frame using some object detector like YOLOv4, CenterNet, etc (In our case- the detections is given part of db).
2. A motion predictor: This module is responsible for predicting the future motion of the object using its past information.
DeepSORT: arXiv:1703.07402
Centroid: centroid tracking
#######
class EuclideanDistTracker:
def __init__(self):
self.center_points = {}
self.id_count = 0
def update(self, objects_rect):
"""
Parameters:
-----------
object_rect: array of bounding box coordinates.
--------
Returns:
list containing [x,y,w,h,object_id].
x,y,w,h are the bounding box coordinates, and object_id is the id assigned to that particular bounding box.
--------
"""
# Objects boxes and ids
objects_bbs_ids = []
# Get center point of new object
for rect in objects_rect:
x, y, w, h = rect
cx,cy = (x+w)/2, (y+h)/2
# cx = (x + x + w) // 2 # Center x
# cy = (y + y + h) // 2 # Center y
# Find out if that object was detected already
same_object_detected = False
for id, pt in self.center_points.items():
dist = math.hypot(cx - pt[0], cy - pt[1])
if dist < 25:
self.center_points[id] = (cx, cy)
objects_bbs_ids.append([x, y, w, h, id])
same_object_detected = True
break
# New object is detected we assign the ID to that object
if same_object_detected is False:
self.center_points[self.id_count] = (cx, cy)
objects_bbs_ids.append([x, y, w, h, self.id_count])
self.id_count += 1
# Clean the dictionary by center points to remove IDS not used anymore
new_center_points = {}
for obj_bb_id in objects_bbs_ids:
_, _, _, _, object_id = obj_bb_id
center = self.center_points[object_id]
new_center_points[object_id] = center
# Update dictionary with IDs not used removed
self.center_points = new_center_points.copy()
return objects_bbs_ids
def get_detections(detections_dict,height,width):
detections = []
for diction in detections_dict:
if diction['conf'] > 0.95:
detections.append(diction['bbox'])
return detections
tracker = EuclideanDistTracker()
submission_res_by_tracks = generate_zero_submission(seq_test.seq_id.unique())
i=0
for seq_id in tqdm(seq_test.seq_id.unique()):
tracker.__init__()
seq_frames = seq_test[seq_test.seq_id == seq_id].sort_values(by=['seq_frame_num']).reset_index( drop = True)
for _,frame in seq_frames.iterrows():
detections = get_detections(detection_test_map[frame['id']], frame['height'],frame['width'])
boxes_ids = tracker.update(detections)
if i < 4 and tracker.id_count > 1 :
display(seq_frames)
show_images_seq(seq_frames,detection_test_map,train=False)
print(f'id_count = {tracker.id_count}')
i+=1
submission_res_by_tracks.loc[submission_res_by_tracks.Id == seq_id, 'Predicted'] = tracker.id_count
display(submission_res_by_tracks)
print(max(submission_res_by_tracks.Predicted))
submission_res_by_tracks.to_csv (r'res_by_tracks.csv', index = False, header=True)
Now with centroid-tracker model:
DEEP-SORT-TRACKER https://pypi.org/project/deep-sort-realtime/
Also, in order to get better performance - we can produce the identifications ourselves by our own model.
You can also try the deep sort tracker which uses a Kalman filter on the image sequences.
"""
This code taken from https://github.com/alcunha/iwildcam2021ufam
"""
!git clone https://github.com/Jordan-me/DeepSort_Tracker.git
from DeepSort_Tracker.deep_sort import nn_matching
from DeepSort_Tracker.deep_sort.detection import Detection
from DeepSort_Tracker.deep_sort.tracker import Tracker
max_cosine_distance = 0.2
nn_budget = None
def create_detections(annot, image_id):
"""
This method return array of Detection objec (see class above)
Parameters
----------
annot : dataframe_like of bounding boxes for given images
img_id : string desctibe image id
"""
detection_list = []
for bbox in annot[image_id]:
detection_list.append(Detection(bbox['bbox'],bbox['conf'],None))
return detection_list
def run_deepsort_on_seq(detections_list, frames_ids):
metric = nn_matching.NearestNeighborDistanceMetric("cosine", max_cosine_distance, nn_budget)
tracker = Tracker(metric)
confirmed_tracks = []
all_tracks = []
for frame_id, detections in zip(frames_ids, detections_list):
tracker.predict()
for i in detections:
print(i.tlwh)
tracker.update(detections)
for track in tracker.tracks:
if track.is_confirmed():
confirmed_tracks.append(track.track_id)
if track.time_since_update > 1:
continue
bbox = track.to_tlwh()
all_tracks.append([frame_id, track.track_id, bbox[0], bbox[1], bbox[2], bbox[3]])
confirmed_tracks = set(confirmed_tracks)
print(confirmed_tracks)
results = [track_bbox for track_bbox in all_tracks
if track_bbox[1] in confirmed_tracks]
print(results)
return results
def track_iwildcam(data_set, annotations):
tracks_list = []
for seq_id in data_set.seq_id.unique():
seq_info = data_set[data_set.seq_id == seq_id]
seq_info = seq_info.sort_values(by=['seq_frame_num']) # [1st frame, 2st frame ,...]
detections_list = []
frames_ids = []
for _, row in seq_info.iterrows(): # for each farme..
detections = create_detections(annotations, row['id'])
detections_list.append(detections)
frames_ids.append(row['id'])
results = run_deepsort_on_seq(detections_list, frames_ids)
for bbox in results:
bbox_info = {'seq_id': seq_id,
'img_id': bbox[0],
'track_id': seq_id + str(bbox[1]),
'bbox_tlwh': bbox[2:] }
tracks_list.append(bbox_info)
return tracks_list
# confirmed_tracks = track_iwildcam(seq_test, detection_test_map)
# tracker = DeepSort(max_age=30, nn_budget=70, override_track_class=None)